Idee map-reduce
public static void MRApproxOutliersBISCA1(JavaPairRDD<Float, Float> points, float D, int M, int K){
//step a
float lambda = (float) (D/(2*Math.sqrt(2)));
JavaPairRDD<Tuple2<Integer, Integer>, Integer> cellCount = points.flatMapToPair(
(pair)->{
ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>> pairsList = new ArrayList<>();
pairsList.add(new Tuple2<>(new Tuple2<>((int) Math.floor(pair._1/lambda),(int) Math.floor(pair._2/lambda)), 1));
return pairsList.iterator();
}
).reduceByKey(Integer::sum);
//step b
Map<Tuple2<Integer, Integer>, Integer> tmpMap = cellCount.collectAsMap();
JavaPairRDD<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>> outliersPoint = cellCount.flatMapToPair(
(pair)-> {
HashMap<Tuple2<Integer, Integer>, Tuple3<Integer,Integer,Integer>> pairSizeN3N7 = new HashMap<>();
pairSizeN3N7.put(pair._1,new Tuple3<>(pair._2, 0, 0));
//Notice: this for count itself too
for(int i = -3 ; i < 4 ; i++) {
for (int j = -3; j < 4; j++) {
int cellIJCount = tmpMap.getOrDefault(new Tuple2<>(pair._1._1 + i, pair._1._2 + j),0);
if ((i < -1 || i > 1) || (j < -1 || j > 1))
pairSizeN3N7.put(pair._1, new Tuple3<>(pair._2, pairSizeN3N7.get(pair._1)._2(), pairSizeN3N7.get(pair._1)._3() + cellIJCount));
else
pairSizeN3N7.put(pair._1, new Tuple3<>(pair._2, pairSizeN3N7.get(pair._1)._2() + cellIJCount, pairSizeN3N7.get(pair._1)._3() + cellIJCount));
}
}
ArrayList<Tuple2<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>>> compPoints = new ArrayList<>();
for(Map.Entry<Tuple2<Integer,Integer>,Tuple3<Integer,Integer,Integer>> e : pairSizeN3N7.entrySet()){
compPoints.add(new Tuple2<>(e.getKey(),e.getValue()));
}
return compPoints.iterator();
}
);
List<Tuple2<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>>> finalList= outliersPoint.collect();
int outliers = 0, uncertains = 0;
for(Tuple2<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>> elem : finalList){
if (elem._2._3() <= M) outliers += elem._2._1();
if (elem._2._2() <= M && elem._2._3()>M) uncertains += elem._2._1();
}
System.out.println("Number of sure outliers = "+ outliers);
System.out.println("Number of uncertain points = "+uncertains);
JavaPairRDD<Integer, Tuple2<Integer, Integer>> ordercell = cellCount.flatMapToPair(
(pair)-> {
ArrayList<Tuple2<Integer,Tuple2<Integer, Integer>>> pairsList = new ArrayList<>();
pairsList.add(new Tuple2<>(pair._2,new Tuple2<>(pair._1._1(),pair._1._2())));
return pairsList.iterator();
}
);
List<Tuple2<Integer, Tuple2<Integer, Integer>>> firstKElements = ordercell.sortByKey().take(K);
for(Tuple2<Integer, Tuple2<Integer, Integer>> e : firstKElements){
System.out.println("Cell: "+e._2+" Size="+e._1);
}
}